{
 "cells": [
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Creating a Custom Executor\n",
    "\n",
    "Executors define how a backend resource handles computations. They specify everything about the resource: the hardware and configuration, the computation strategy, logic, and even goals.\n",
    "\n",
    "Executors are plugins. Any executor plugins found by the dispatcher are imported as classes in the `covalent.executor` name space.\n",
    "\n",
    "Covalent already contains a number of versatile executors. (See [Choosing an Executor For a Task](choosing_executors.ipynb) for information about choosing an existing executor.) \n",
    "\n",
    "If an existing executor does not fit your needs, you can write your own, using your choice of environments, hardware, and cloud resources to execute Covalent electrons however you like. A template to write an executor can be found [here](https://github.com/AgnostiqHQ/covalent-executor-template).\n",
    "\n",
    "### Prerequisites\n",
    "\n",
    "Decide the purpose of the executor. You should have a good handle on the following questions:\n",
    "- What is the purpose of the executor? \n",
    "- What types of tasks is it designed to run?\n",
    "- What capabilities does the executor require that aren't already in an existing executor?\n",
    "- What hardware or cloud resource will it run on?\n",
    "- Will it scale? How?\n"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Procedure\n",
    "\n",
    "The following example creates a `TimingExecutor` that computes the CPU time used by the function to help determine its efficiency. It then writes this result to a file along with its `dispatch_id` and `node_id`.\n",
    "\n",
    "1. Decide whether to make your executor asynchronous.\n",
    "\n",
    "Covalent is written to be capable of running asynchronous (async) executors. In general, Covalent suggests that you write your custom executors to be async-capable as well, especially if it depends on network communication or has I/O-bound logic inside the `run()` function. \n",
    "\n",
    "Some examples of async executors are: \n",
    "- The default [DaskExecutor](https://github.com/AgnostiqHQ/covalent/blob/develop/covalent/executor/executor_plugins/dask.py)\n",
    "- [SSHPlugin](https://github.com/AgnostiqHQ/covalent-ssh-plugin)\n",
    "- [SlurmPlugin](https://github.com/AgnostiqHQ/covalent-slurm-plugin).\n",
    "\n",
    "To make your executor async-capable, do the following:\n",
    "\n",
    "    i. Subclass `AsyncBaseExecutor` instead of `BaseExecutor`\n",
    "    ii. Define your `run()` function with:\n",
    "\n",
    "        `async def run(...)`\n",
    "    \n",
    "        instead of \n",
    "    \n",
    "        `def run(...)`\n",
    "\n",
    "2. Import the Covalent `BaseExecutor` (or `AsyncBaseExecutor`) and Python `typing` libraries."
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "3. Write the plugin class. The class must contain:\n",
    "\n",
    "- `executor_plugin_name`, name of the executor to make it importable by `covalent.executors` - must be the same as its class name\n",
    "- The default values for the plugin parameters defined in `_EXECUTOR_PLUGIN_DEFAULTS`.\n",
    "- A `run()` function that handles the task to be executed. The `run()` function must take these parameters:\n",
    "    - A `Callable` object to contain the task;\n",
    "    - A list of arguments (`args`) and a dictionary of keyword arguments (`kwargs`) to pass to the `Callable`.\n",
    "    - A dictionary, `task_metadata`, to store the `dispatch_id` and `node_id` (and possibly other metadata in the future).\n",
    "- `_EXECUTOR_PLUGIN_DEFAULTS`, if there are any defaults for the executor.\n",
    "\n",
    "With all the above in mind, the example `TimingExecutor` class looks like this:"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Since we are not planning on having any defaults, we can skip adding `_EXECUTOR_PLUGIN_DEFAULTS` here.\n",
    "Ideally any collection of custom executors should reside in a separate directory, so let's create a new dir for our plugin:\n",
    "\n",
    "In a terminal window:\n",
    "\n",
    "```bash\n",
    "mkdir custom_executors\n",
    "```\n",
    "\n",
    "And copy the following snippet in a file `timing_plugin.py` in that directory:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "\"\"\"Timing executor plugin for Covalent.\"\"\"\n",
    "\n",
    "import time\n",
    "from pathlib import Path\n",
    "from typing import Any, Callable, Dict, List\n",
    "\n",
    "from covalent.executor.base import BaseExecutor\n",
    "\n",
    "executor_plugin_name = \"TimingExecutor\"  # Required by covalent.executors\n",
    "\n",
    "_EXECUTOR_PLUGIN_DEFAULTS = {\n",
    "    \"timing_filepath\": \"\"\n",
    "}  # Set default values for executor plugin parameters here\n",
    "\n",
    "\n",
    "class TimingExecutor(BaseExecutor):\n",
    "    \"\"\"Executor that times the execution time.\"\"\"\n",
    "\n",
    "    def __init__(self, timing_filepath: str = \"\", **kwargs):\n",
    "        \"\"\"Init function.\n",
    "\n",
    "        Args:\n",
    "            timing_filepath: Filepath where the timing information will be written.\n",
    "\n",
    "        \"\"\"\n",
    "        self.timing_filepath = str(Path(timing_filepath).resolve())\n",
    "        super().__init__(**kwargs)\n",
    "\n",
    "    def run(self, function: Callable, args: List, kwargs: Dict, task_metadata: Dict) -> Any:\n",
    "        \"\"\"Measures the time taken to execute a function.\n",
    "\n",
    "        Args:\n",
    "            function: Function to be executed.\n",
    "            args: Arguments to be passed to the function.\n",
    "            kwargs: Keyword arguments to be passed to the function.\n",
    "            task_metadata: Metadata about the task. Expects node_id and dispatch_id.\n",
    "\n",
    "        Returns:\n",
    "            The result of the function.\n",
    "\n",
    "        \"\"\"\n",
    "        start = time.process_time()\n",
    "\n",
    "        result = function(*args, **kwargs)\n",
    "\n",
    "        time_taken = time.process_time() - start\n",
    "\n",
    "        with open(f\"{self.timing_filepath}\", \"w\") as f:\n",
    "            f.write(\n",
    "                f\"Node {task_metadata['node_id']} in dispatch {task_metadata['dispatch_id']} took {time_taken}s of CPU time.\\n\"\n",
    "            )\n",
    "\n",
    "        return result\n"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now, assuming the `custom_executors` directory is in the same directory as this notebook, let's inform covalent on where to find the plugin:\n",
    "\n",
    "In a terminal window:\n",
    "\n",
    "```bash\n",
    "export COVALENT_EXECUTOR_DIR=$PWD/custom_executors\n",
    "covalent restart\n",
    "```"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "At this point the executor is ready for use.\n",
    "\n",
    "4. Construct electrons and assign them to the new executor, then execute them in a lattice:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "import covalent as ct\n",
    "import sys\n",
    "\n",
    "sys.path.append(str(Path(\"./custom_executors\").resolve()))\n",
    "from custom_executors import TimingExecutor\n",
    "\n",
    "\n",
    "timing_log = \"/tmp/cpu_timing.log\"\n",
    "timing_executor = TimingExecutor(timing_log)\n",
    "\n",
    "# Calculate e based on a series\n",
    "@ct.electron(executor=timing_executor)\n",
    "def e_ser(x):\n",
    "    e_est = 1\n",
    "    fact = 1\n",
    "    for i in range(1, x):\n",
    "        fact *= i\n",
    "        e_est += 1 / fact\n",
    "    return e_est\n",
    "\n",
    "\n",
    "@ct.lattice(executor=timing_executor)\n",
    "def workflow(x):\n",
    "    return e_ser(x)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "5. Run the lattice:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "Lattice Result\n",
      "==============\n",
      "status: COMPLETED\n",
      "result: 2.7182815255731922\n",
      "input args: ['10']\n",
      "input kwargs: {}\n",
      "error: None\n",
      "\n",
      "start_time: 2023-04-14 11:19:44.967362\n",
      "end_time: 2023-04-14 11:19:45.108294\n",
      "\n",
      "results_dir: /home/user/.local/share/covalent/data\n",
      "dispatch_id: 35b9dda6-c4b8-4ec2-b820-67fb3ab606dc\n",
      "\n",
      "Node Outputs\n",
      "------------\n",
      "e_ser(0): 2.7182815255731922\n",
      ":parameter:10(1): 10\n",
      ":postprocess:(2): 2.7182815255731922\n",
      "\n",
      "Node 0 in dispatch 35b9dda6-c4b8-4ec2-b820-67fb3ab606dc took 0.00019649999999948875s of CPU time.\n",
      "\n"
     ]
    }
   ],
   "source": [
    "dispatch_id = ct.dispatch(workflow)(10)\n",
    "result = ct.get_result(dispatch_id, wait=True)\n",
    "print(result)\n",
    "\n",
    "for line in open(timing_log, \"r\"):\n",
    "    print(line)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### See Also\n",
    "\n",
    "[Adding Constraints to Tasks and Workflows](./coding/add_constraints_to_lattice)\n",
    "\n",
    "[Choosing an Executor For a Task](choosing_executors.ipynb)\n",
    "    \n",
    "[Executor Template (GitHub)](https://github.com/AgnostiqHQ/covalent-executor-template)\n",
    "\n",
    "[DaskExecutor (GitHub)](https://github.com/AgnostiqHQ/covalent/blob/develop/covalent/executor/executor_plugins/dask.py)\n",
    "\n",
    "[SSHPlugin (GitHub)](https://github.com/AgnostiqHQ/covalent-ssh-plugin)\n",
    "\n",
    "[SlurmPlugin (GitHub)](https://github.com/AgnostiqHQ/covalent-slurm-plugin)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.15"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
